package com.niit.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class KafkaProducerTest2 {
    //异步的方式去生产数据：每生产一条信息，会通知生产是否生产成功，成功去执行一些操作，失败也可以执行一些操作
    public static void main(String[] args) throws InterruptedException {
        //1.创建用于连接Kafka的配置 Properties
        Properties props = new Properties();
        //连接Kafka服务器
        props.put("bootstrap.servers","node1:9092");
        //应答模式 为 all
        props.put("acks","all");
        //配置生产数据的 key 进行序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //配置生产数据的 value 进行序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //2.创建一个生产者
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
        for (int i =0 ; i<100000;i++){
            //构建数据对象
            ProducerRecord<String,String> record = new ProducerRecord<>("BD2",null,i+"");
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    //1.判断消息是否发送成功
                    if(e == null){
                        //获取主题
                        String topic = recordMetadata.topic();
                        //分区id
                        int partition = recordMetadata.partition();
                        //时间戳
                        long timestamp = recordMetadata.timestamp();
                        //偏移量
                        long offset = recordMetadata.offset();
                        System.out.println("主题："+topic+",分区id："+partition+",时间："+timestamp+",偏移量："+offset);
                    }else{

                        System.out.println("发现异常"+e.getMessage());
                    }
                }
            });
            Thread.sleep(3000);
        }

        producer.close();

    }

}
